package com.xiaofan.ct.consumer.dao;

import com.xiaofan.ct.common.bean.BaseDao;
import com.xiaofan.ct.common.constant.Names;
import com.xiaofan.ct.common.constant.ValueConstant;
import com.xiaofan.ct.consumer.bean.CallLog;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * Hbase的数据访问对象
 */
public class HbaseDao extends BaseDao {

    /**
     * 初始化
     */
    public void init() throws IOException {
        start();
        createNamespaceNX(Names.NAMESPACE.getValue());
        createTableXX(Names.TABLE.getValue(), "com.xiaofan.ct.consumer.coprocessor.InsertCalleeCoprocessor", ValueConstant.REGION_COUNT, Names.CF_CALLER.getValue(),Names.CF_CALLEE.getValue());
        end();
    }

    /**
     * 插入对象
     *
     * @param log
     * @throws IOException
     */
    public void insertData(CallLog log) throws IOException, IllegalAccessException {

        String rowKey = genRegionNum(log.getCall1(), log.getCallTime()) + "_" + log.getCall1() + "_" + log.getCallTime() + "_" + log.getCall2() + "_" + log.getDuration() + "_1";
        log.setRowKey(rowKey);
        putData(log);
    }

    /**
     * 插入数据
     */
    public void insertData(String value) throws IOException {
        // 将通话日志保存到hbase中

        // 获取通话日志
        String[] values = value.split("\t");
        String call1 = values[0];
        String call2 = values[1];
        String callTime = values[2];
        String duration = values[3];

        // 创建时数据对象
        /**
         * rowKey的设计
         * 1. 长度原则
         *      最大值64KB， 推荐长度为10~100byte
         *      最好8的倍数，能短则短，rowKey如果太长会影响性能
         * 2. 唯一原则，rowKey具备唯一性
         * 3. 散列原则
         *      3.1. 盐值散列，不能使用时间戳直接作为rowKey，在rowKey前面增加随机数
         *      3.2. 字符串反转，1312312334342， 1312312334345
         *           电话号码： 183 + 1062 + 9526
         *      3.3. 计算分区号： hashMap
         */

        // rowKey = regionNum + call1 + time + call2 + duration
        String rowKey = genRegionNum(call1, callTime) + "_" + call1 + "_" + callTime + "_" + call2 + "_" + duration + "_1";
        //  主叫用户
        Put put = new Put(Bytes.toBytes(rowKey));

        byte[] family = Bytes.toBytes(Names.CF_CALLER.getValue());
        put.addColumn(family, Bytes.toBytes("call1"), Bytes.toBytes(call1));
        put.addColumn(family, Bytes.toBytes("call2"), Bytes.toBytes(call2));
        put.addColumn(family, Bytes.toBytes("callTime"), Bytes.toBytes(callTime));
        put.addColumn(family, Bytes.toBytes("duration"), Bytes.toBytes(duration));
        put.addColumn(family, Bytes.toBytes("flg"), Bytes.toBytes("1"));



        String calleeRowKey = genRegionNum(call2, callTime) + "_" + call2 + "_" + callTime + "_" + call1 + "_" + duration + "_0";
        // 被叫用户
        Put calleePut = new Put(Bytes.toBytes(calleeRowKey));

        byte[] calleeFamily = Bytes.toBytes(Names.CF_CALLEE.getValue());
        calleePut.addColumn(calleeFamily, Bytes.toBytes("call1"), Bytes.toBytes(call2));
        calleePut.addColumn(calleeFamily, Bytes.toBytes("call2"), Bytes.toBytes(call1));
        calleePut.addColumn(calleeFamily, Bytes.toBytes("callTime"), Bytes.toBytes(callTime));
        calleePut.addColumn(calleeFamily, Bytes.toBytes("duration"), Bytes.toBytes(duration));
        calleePut.addColumn(calleeFamily, Bytes.toBytes("flg"), Bytes.toBytes("0"));

        List<Put> puts = new ArrayList<>();
        puts.add(put);
        puts.add(calleePut);
        putData(Names.TABLE.getValue(), puts);
    }
}
